-- compute available slots under worker pool concurrency limits
WITH pool_slots AS (
    SELECT
        wp.id,
        GREATEST (0, wp.concurrency_limit - COUNT(fr.id)) AS available_slots
    FROM
        work_pool wp
        JOIN work_queue wq ON wq.work_pool_id = wp.id
        LEFT JOIN flow_run fr ON wq.id = fr.work_queue_id
            AND fr.state_type IN ('RUNNING', 'PENDING')
    WHERE
        wp.is_paused IS FALSE
        AND wq.is_paused IS FALSE
        AND wp.concurrency_limit IS NOT NULL
    GROUP BY
        wp.id
),

-- compute available slots under worker pool queue concurrency limits
queue_slots AS (
    SELECT
        wq.id,
        GREATEST (0, wq.concurrency_limit - COUNT(fr.id)) AS available_slots
    FROM
        work_queue wq
        LEFT JOIN flow_run fr ON wq.id = fr.work_queue_id
            AND fr.state_type IN ('RUNNING', 'PENDING')
    WHERE
        wq.is_paused IS FALSE
        AND wq.concurrency_limit IS NOT NULL
    GROUP BY
        wq.id
)

-- get all flow runs that match criteria
SELECT
    wp.id AS run_work_pool_id,
    fr_outer.work_queue_id AS run_work_queue_id,
    fr_outer.*
FROM
    work_pool wp
    LEFT JOIN pool_slots ON wp.id = pool_slots.id
    

    -- get worker pool queues for each worker pool
    -- this is a lateral join so we can efficiently apply an independent limit 
    -- to each pool based on available concurrency
    CROSS JOIN LATERAL (
        SELECT
            fr_inner.*,
            wq.priority as work_queue_priority
        FROM
            work_queue wq
            LEFT JOIN queue_slots ON wq.id = queue_slots.id
    

            -- get scheduled flow runs for each worker pool queue
            -- this is a lateral join so we can efficiently apply an independent limit 
            -- to each queue based on available concurrency
            CROSS JOIN LATERAL (
                SELECT
                    fr.*
                FROM
                    flow_run fr
                WHERE
                    fr.work_queue_id = wq.id
                    AND fr.state_type = 'SCHEDULED'
                    {% if scheduled_after %}
                    AND fr.next_scheduled_start_time >= :scheduled_after
                    {% endif %}
                    {% if scheduled_before %}
                    AND fr.next_scheduled_start_time <= :scheduled_before
                    {% endif %}
                ORDER BY
                    fr.next_scheduled_start_time ASC
                LIMIT LEAST (:queue_limit, queue_slots.available_slots)
                -- lock runs
                FOR UPDATE SKIP LOCKED
                ) fr_inner
        
        WHERE
            wq.work_pool_id = wp.id
            AND wq.is_paused IS FALSE 

            {% if work_queue_ids %}
            -- optionally filter for specific worker pool queue IDs
            AND wq.id IN :work_queue_ids
            {% endif %}
        
        ORDER BY 
            {% if respect_queue_priorities %}
            -- optionally order by worker pool queue priorities
            wq.priority ASC,
            {% endif %}
            fr_inner.next_scheduled_start_time ASC

        LIMIT LEAST (:worker_limit, pool_slots.available_slots)) fr_outer

WHERE
    wp.is_paused IS FALSE 

    {% if work_pool_ids %}
    -- optionally filter for specific worker pool IDs
    AND wp.id IN :work_pool_ids
    {% endif %}

ORDER BY 
{% if respect_queue_priorities %}
work_queue_priority ASC,
{% endif %}
next_scheduled_start_time ASC

LIMIT :limit